Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add query cancel sync for presto forward #41

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

yabinma
Copy link
Member

@yabinma yabinma commented Feb 7, 2025

Background:
Presto forward is a feature to monitor the workload in source Presto cluster and then clone the workload to target clusters.

It's well working. There is one requirement which to syncronize the query cancel to target clusters. That means when the workload is canceled in source cluster, the target cluster should cancel the workload as well.

Solution:

  1. Maintain a query cache, including the queryId in both of source and target clusters, the nextUri for canceling the query. When a query is triggered in target clusters, the cache is updated. While after the query execution completion, remove the query in the cache.
  2. In next sync, check if there is any query is canceled in source cluster. If it does, and then to match in the cache. If it's found in cache, means it's still running. And then call api to cancel the query.

@yabinma yabinma requested a review from ethanyzhang as a code owner February 7, 2025 19:35
queryStateFailed = "FAILED"
)

type QueryCache struct {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
type QueryCache struct {
type QueryCacheEntry struct {

to be more precise

runningTasks sync.WaitGroup
failedToForward atomic.Uint32
forwarded atomic.Uint32
runningQueriesCacheMap = make(map[string]*[]QueryCache)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a comment for this, what is the kay and value for.

@@ -171,6 +187,16 @@ func Run(_ *cobra.Command, _ []string) {
Msgf("finished forwarding queries")
}

func checkAndCancelQuery(ctx context.Context, queryState *presto.QueryStateInfo) {
if queryCaches, ok := runningQueriesCacheMap[queryState.QueryId]; ok {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if queryCaches, ok := runningQueriesCacheMap[queryState.QueryId]; ok {
if queryCacheEntries, ok := runningQueriesCacheMap[queryState.QueryId]; ok {

@@ -226,9 +252,10 @@ func forwardQuery(ctx context.Context, queryState *presto.QueryStateInfo, client
}
successful, failed := atomic.Uint32{}, atomic.Uint32{}
forwardedQueries := sync.WaitGroup{}
cachedQueries := []QueryCache{}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
cachedQueries := []QueryCache{}
cachedQueries := make([]QueryCache, len(clients)-1)

Pre-allocate the elements to prevent racing conditions.
See my comment below.

)

type QueryCache struct {
QueryId string
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems like we do not need the QueryId field here right?

@@ -246,6 +273,10 @@ func forwardQuery(ctx context.Context, queryState *presto.QueryStateInfo, client
failed.Add(1)
return
}
//build cache for running query
if clientResult.NextUri != nil {
*cachedQueries = append(*cachedQueries, QueryCache{QueryId: clientResult.Id, NextUri: *clientResult.NextUri, Client: client})
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here you write to cachedQueries concurrently, which could lead to issues.
In the comment above I requested to pre-allocate the elements. so here you can just update element using index i-1, without needing to update the cachedQueries value

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants